#
#  Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
import io
import json
import os
import random
import re
from functools import partial

import numpy as np
import trio
from PIL import Image

from api.db.services.file2document_service import File2DocumentService
from api.db.services.file_service import FileService
from api.db.services.llm_service import LLMBundle
from common import settings
from common.constants import LLMType
from common.misc_utils import get_uuid
from deepdoc.parser import ExcelParser
from deepdoc.parser.mineru_parser import MinerUParser
from deepdoc.parser.pdf_parser import PlainParser, RAGFlowPdfParser, VisionParser
from deepdoc.parser.tcadp_parser import TCADPParser
from rag.app.naive import Docx
from rag.flow.base import ProcessBase, ProcessParamBase
from rag.flow.parser.schema import ParserFromUpstream
from rag.llm.cv_model import Base as VLM
from rag.nlp import attach_media_context
from rag.utils.base64_image import image2id


class ParserParam(ProcessParamBase):
    def __init__(self):
        super().__init__()
        self.allowed_output_format = {
            "pdf": [
                "json",
                "markdown",
            ],
            "spreadsheet": [
                "json",
                "markdown",
                "html",
            ],
            "word": [
                "json",
                "markdown",
            ],
            "slides": [
                "json",
            ],
            "image": [
                "text",
            ],
            "email": [
                "text",
                "json",
            ],
            "text&markdown": [
                "text",
                "json",
            ],
            "audio": [
                "json",
            ],
            "video": [],
        }

        self.setups = {
            "pdf": {
                "parse_method": "deepdoc",  # deepdoc/plain_text/tcadp_parser/vlm
                "lang": "Chinese",
                "suffix": [
                    "pdf",
                ],
                "output_format": "json",
                "table_context_size": 0,
                "image_context_size": 0,
            },
            "spreadsheet": {
                "parse_method": "deepdoc",  # deepdoc/tcadp_parser
                "output_format": "html",
                "suffix": [
                    "xls",
                    "xlsx",
                    "csv",
                ],
                "table_context_size": 0,
                "image_context_size": 0,
            },
            "word": {
                "suffix": [
                    "doc",
                    "docx",
                ],
                "output_format": "json",
                "table_context_size": 0,
                "image_context_size": 0,
            },
            "text&markdown": {
                "suffix": ["md", "markdown", "mdx", "txt"],
                "output_format": "json",
                "table_context_size": 0,
                "image_context_size": 0,
            },
            "slides": {
                "parse_method": "deepdoc",  # deepdoc/tcadp_parser
                "suffix": [
                    "pptx",
                    "ppt",
                ],
                "output_format": "json",
                "table_context_size": 0,
                "image_context_size": 0,
            },
            "image": {
                "parse_method": "ocr",
                "llm_id": "",
                "lang": "Chinese",
                "system_prompt": "",
                "suffix": ["jpg", "jpeg", "png", "gif"],
                "output_format": "text",
            },
            "email": {
                "suffix": [
                    "eml",
                    "msg",
                ],
                "fields": ["from", "to", "cc", "bcc", "date", "subject", "body", "attachments", "metadata"],
                "output_format": "json",
            },
            "audio": {
                "suffix": [
                    "da",
                    "wave",
                    "wav",
                    "mp3",
                    "aac",
                    "flac",
                    "ogg",
                    "aiff",
                    "au",
                    "midi",
                    "wma",
                    "realaudio",
                    "vqf",
                    "oggvorbis",
                    "ape",
                ],
                "output_format": "text",
            },
            "video": {
                "suffix": [
                    "mp4",
                    "avi",
                    "mkv",
                ],
                "output_format": "text",
            },
        }

    def check(self):
        pdf_config = self.setups.get("pdf", {})
        if pdf_config:
            pdf_parse_method = pdf_config.get("parse_method", "")
            self.check_empty(pdf_parse_method, "Parse method abnormal.")

            if pdf_parse_method.lower() not in ["deepdoc", "plain_text", "mineru", "tcadp parser"]:
                self.check_empty(pdf_config.get("lang", ""), "PDF VLM language")

            pdf_output_format = pdf_config.get("output_format", "")
            self.check_valid_value(pdf_output_format, "PDF output format abnormal.", self.allowed_output_format["pdf"])

        spreadsheet_config = self.setups.get("spreadsheet", "")
        if spreadsheet_config:
            spreadsheet_output_format = spreadsheet_config.get("output_format", "")
            self.check_valid_value(spreadsheet_output_format, "Spreadsheet output format abnormal.", self.allowed_output_format["spreadsheet"])

        doc_config = self.setups.get("word", "")
        if doc_config:
            doc_output_format = doc_config.get("output_format", "")
            self.check_valid_value(doc_output_format, "Word processer document output format abnormal.", self.allowed_output_format["word"])

        slides_config = self.setups.get("slides", "")
        if slides_config:
            slides_output_format = slides_config.get("output_format", "")
            self.check_valid_value(slides_output_format, "Slides output format abnormal.", self.allowed_output_format["slides"])

        image_config = self.setups.get("image", "")
        if image_config:
            image_parse_method = image_config.get("parse_method", "")
            if image_parse_method not in ["ocr"]:
                self.check_empty(image_config.get("lang", ""), "Image VLM language")

        text_config = self.setups.get("text&markdown", "")
        if text_config:
            text_output_format = text_config.get("output_format", "")
            self.check_valid_value(text_output_format, "Text output format abnormal.", self.allowed_output_format["text&markdown"])

        audio_config = self.setups.get("audio", "")
        if audio_config:
            self.check_empty(audio_config.get("llm_id"), "Audio VLM")

        video_config = self.setups.get("video", "")
        if video_config:
            self.check_empty(video_config.get("llm_id"), "Video VLM")

        email_config = self.setups.get("email", "")
        if email_config:
            email_output_format = email_config.get("output_format", "")
            self.check_valid_value(email_output_format, "Email output format abnormal.", self.allowed_output_format["email"])

    def get_input_form(self) -> dict[str, dict]:
        return {}


class Parser(ProcessBase):
    component_name = "Parser"

    def _pdf(self, name, blob):
        self.callback(random.randint(1, 5) / 100.0, "Start to work on a PDF.")
        conf = self._param.setups["pdf"]
        self.set_output("output_format", conf["output_format"])

        if conf.get("parse_method").lower() == "deepdoc":
            bboxes = RAGFlowPdfParser().parse_into_bboxes(blob, callback=self.callback)
        elif conf.get("parse_method").lower() == "plain_text":
            lines, _ = PlainParser()(blob)
            bboxes = [{"text": t} for t, _ in lines]
        elif conf.get("parse_method").lower() == "mineru":
            mineru_executable = os.environ.get("MINERU_EXECUTABLE", "mineru")
            mineru_api = os.environ.get("MINERU_APISERVER", "http://host.docker.internal:9987")
            pdf_parser = MinerUParser(mineru_path=mineru_executable, mineru_api=mineru_api)
            ok, reason = pdf_parser.check_installation()
            if not ok:
                raise RuntimeError(f"MinerU not found or server not accessible: {reason}. Please install it via: pip install -U 'mineru[core]'.")

            lines, _ = pdf_parser.parse_pdf(
                filepath=name,
                binary=blob,
                callback=self.callback,
                output_dir=os.environ.get("MINERU_OUTPUT_DIR", ""),
                delete_output=bool(int(os.environ.get("MINERU_DELETE_OUTPUT", 1))),
            )
            bboxes = []
            for t, poss in lines:
                box = {
                    "image": pdf_parser.crop(poss, 1),
                    "positions": [[pos[0][-1], *pos[1:]] for pos in pdf_parser.extract_positions(poss)],
                    "text": t,
                }
                bboxes.append(box)
        elif conf.get("parse_method").lower() == "tcadp parser":
            # ADP is a document parsing tool using Tencent Cloud API
            table_result_type = conf.get("table_result_type", "1")
            markdown_image_response_type = conf.get("markdown_image_response_type", "1")
            tcadp_parser = TCADPParser(
                table_result_type=table_result_type,
                markdown_image_response_type=markdown_image_response_type,
            )
            sections, _ = tcadp_parser.parse_pdf(
                filepath=name,
                binary=blob,
                callback=self.callback,
                file_type="PDF",
                file_start_page=1,
                file_end_page=1000,
            )
            bboxes = []
            for section, position_tag in sections:
                if position_tag:
                    # Extract position information from TCADP's position tag
                    # Format: @@{page_number}\t{x0}\t{x1}\t{top}\t{bottom}##
                    import re

                    match = re.match(r"@@([0-9-]+)\t([0-9.]+)\t([0-9.]+)\t([0-9.]+)\t([0-9.]+)##", position_tag)
                    if match:
                        pn, x0, x1, top, bott = match.groups()
                        bboxes.append(
                            {
                                "page_number": int(pn.split("-")[0]),  # Take the first page number
                                "x0": float(x0),
                                "x1": float(x1),
                                "top": float(top),
                                "bottom": float(bott),
                                "text": section,
                            }
                        )
                    else:
                        # If no position info, add as text without position
                        bboxes.append({"text": section})
                else:
                    bboxes.append({"text": section})
        else:
            vision_model = LLMBundle(self._canvas._tenant_id, LLMType.IMAGE2TEXT, llm_name=conf.get("parse_method"), lang=self._param.setups["pdf"].get("lang"))
            lines, _ = VisionParser(vision_model=vision_model)(blob, callback=self.callback)
            bboxes = []
            for t, poss in lines:
                for pn, x0, x1, top, bott in RAGFlowPdfParser.extract_positions(poss):
                    bboxes.append(
                        {
                            "page_number": int(pn[0]),
                            "x0": float(x0),
                            "x1": float(x1),
                            "top": float(top),
                            "bottom": float(bott),
                            "text": t,
                        }
                    )

        for b in bboxes:
            text_val = b.get("text", "")
            has_text = isinstance(text_val, str) and text_val.strip()
            layout = b.get("layout_type")
            if layout == "figure" or (b.get("image") and not has_text):
                b["doc_type_kwd"] = "image"
            elif layout == "table":
                b["doc_type_kwd"] = "table"

        table_ctx = conf.get("table_context_size", 0) or 0
        image_ctx = conf.get("image_context_size", 0) or 0
        if table_ctx or image_ctx:
            bboxes = attach_media_context(bboxes, table_ctx, image_ctx)

        if conf.get("output_format") == "json":
            self.set_output("json", bboxes)
        if conf.get("output_format") == "markdown":
            mkdn = ""
            for b in bboxes:
                if b.get("layout_type", "") == "title":
                    mkdn += "\n## "
                if b.get("layout_type", "") == "figure":
                    mkdn += "\n![Image]({})".format(VLM.image2base64(b["image"]))
                    continue
                mkdn += b.get("text", "") + "\n"
            self.set_output("markdown", mkdn)

    def _spreadsheet(self, name, blob):
        self.callback(random.randint(1, 5) / 100.0, "Start to work on a Spreadsheet.")
        conf = self._param.setups["spreadsheet"]
        self.set_output("output_format", conf["output_format"])

        parse_method = conf.get("parse_method", "deepdoc")

        # Handle TCADP parser
        if parse_method.lower() == "tcadp parser":
            table_result_type = conf.get("table_result_type", "1")
            markdown_image_response_type = conf.get("markdown_image_response_type", "1")
            tcadp_parser = TCADPParser(
                table_result_type=table_result_type,
                markdown_image_response_type=markdown_image_response_type,
            )
            if not tcadp_parser.check_installation():
                raise RuntimeError("TCADP parser not available. Please check Tencent Cloud API configuration.")

            # Determine file type based on extension
            if re.search(r"\.xlsx?$", name, re.IGNORECASE):
                file_type = "XLSX"
            else:
                file_type = "CSV"

            self.callback(0.2, f"Using TCADP parser for {file_type} file.")
            sections, tables = tcadp_parser.parse_pdf(
                filepath=name,
                binary=blob,
                callback=self.callback,
                file_type=file_type,
                file_start_page=1,
                file_end_page=1000,
            )

            # Process TCADP parser output based on configured output_format
            output_format = conf.get("output_format", "html")

            if output_format == "html":
                # For HTML output, combine sections and tables into HTML
                html_content = ""
                for section, position_tag in sections:
                    if section:
                        html_content += section + "\n"
                for table in tables:
                    if table:
                        html_content += table + "\n"

                self.set_output("html", html_content)

            elif output_format == "json":
                # For JSON output, create a list of text items
                result = []
                # Add sections as text
                for section, position_tag in sections:
                    if section:
                        result.append({"text": section})
                # Add tables as text
                for table in tables:
                    if table:
                        result.append({"text": table, "doc_type_kwd": "table"})

                table_ctx = conf.get("table_context_size", 0) or 0
                image_ctx = conf.get("image_context_size", 0) or 0
                if table_ctx or image_ctx:
                    result = attach_media_context(result, table_ctx, image_ctx)

                self.set_output("json", result)

            elif output_format == "markdown":
                # For markdown output, combine into markdown
                md_content = ""
                for section, position_tag in sections:
                    if section:
                        md_content += section + "\n\n"
                for table in tables:
                    if table:
                        md_content += table + "\n\n"

                self.set_output("markdown", md_content)
        else:
            # Default DeepDOC parser
            spreadsheet_parser = ExcelParser()
            if conf.get("output_format") == "html":
                htmls = spreadsheet_parser.html(blob, 1000000000)
                self.set_output("html", htmls[0])
            elif conf.get("output_format") == "json":
                self.set_output("json", [{"text": txt} for txt in spreadsheet_parser(blob) if txt])
            elif conf.get("output_format") == "markdown":
                self.set_output("markdown", spreadsheet_parser.markdown(blob))

    def _word(self, name, blob):
        self.callback(random.randint(1, 5) / 100.0, "Start to work on a Word Processor Document")
        conf = self._param.setups["word"]
        self.set_output("output_format", conf["output_format"])
        docx_parser = Docx()

        if conf.get("output_format") == "json":
            sections, tbls = docx_parser(name, binary=blob)
            sections = [{"text": section[0], "image": section[1]} for section in sections if section]
            sections.extend([{"text": tb, "image": None, "doc_type_kwd": "table"} for ((_, tb), _) in tbls])

            table_ctx = conf.get("table_context_size", 0) or 0
            image_ctx = conf.get("image_context_size", 0) or 0
            if table_ctx or image_ctx:
                sections = attach_media_context(sections, table_ctx, image_ctx)

            self.set_output("json", sections)
        elif conf.get("output_format") == "markdown":
            markdown_text = docx_parser.to_markdown(name, binary=blob)
            self.set_output("markdown", markdown_text)

    def _slides(self, name, blob):
        self.callback(random.randint(1, 5) / 100.0, "Start to work on a PowerPoint Document")

        conf = self._param.setups["slides"]
        self.set_output("output_format", conf["output_format"])

        parse_method = conf.get("parse_method", "deepdoc")

        # Handle TCADP parser
        if parse_method.lower() == "tcadp parser":
            table_result_type = conf.get("table_result_type", "1")
            markdown_image_response_type = conf.get("markdown_image_response_type", "1")
            tcadp_parser = TCADPParser(
                table_result_type=table_result_type,
                markdown_image_response_type=markdown_image_response_type,
            )
            if not tcadp_parser.check_installation():
                raise RuntimeError("TCADP parser not available. Please check Tencent Cloud API configuration.")

            # Determine file type based on extension
            if re.search(r"\.pptx?$", name, re.IGNORECASE):
                file_type = "PPTX"
            else:
                file_type = "PPT"

            self.callback(0.2, f"Using TCADP parser for {file_type} file.")

            sections, tables = tcadp_parser.parse_pdf(
                filepath=name,
                binary=blob,
                callback=self.callback,
                file_type=file_type,
                file_start_page=1,
                file_end_page=1000,
            )

            # Process TCADP parser output - PPT only supports json format
            output_format = conf.get("output_format", "json")
            if output_format == "json":
                # For JSON output, create a list of text items
                result = []
                # Add sections as text
                for section, position_tag in sections:
                    if section:
                        result.append({"text": section})
                # Add tables as text
                for table in tables:
                    if table:
                        result.append({"text": table, "doc_type_kwd": "table"})

                table_ctx = conf.get("table_context_size", 0) or 0
                image_ctx = conf.get("image_context_size", 0) or 0
                if table_ctx or image_ctx:
                    result = attach_media_context(result, table_ctx, image_ctx)

                self.set_output("json", result)
        else:
            # Default DeepDOC parser (supports .pptx format)
            from deepdoc.parser.ppt_parser import RAGFlowPptParser as ppt_parser

            ppt_parser = ppt_parser()
            txts = ppt_parser(blob, 0, 100000, None)

            sections = [{"text": section} for section in txts if section.strip()]

            # json
            assert conf.get("output_format") == "json", "have to be json for ppt"
            if conf.get("output_format") == "json":
                table_ctx = conf.get("table_context_size", 0) or 0
                image_ctx = conf.get("image_context_size", 0) or 0
                if table_ctx or image_ctx:
                    sections = attach_media_context(sections, table_ctx, image_ctx)
                self.set_output("json", sections)

    def _markdown(self, name, blob):
        from functools import reduce

        from rag.app.naive import Markdown as naive_markdown_parser
        from rag.nlp import concat_img

        self.callback(random.randint(1, 5) / 100.0, "Start to work on a markdown.")
        conf = self._param.setups["text&markdown"]
        self.set_output("output_format", conf["output_format"])

        markdown_parser = naive_markdown_parser()
        sections, tables, section_images = markdown_parser(
            name,
            blob,
            separate_tables=False,
            delimiter=conf.get("delimiter"),
            return_section_images=True,
        )

        if conf.get("output_format") == "json":
            json_results = []

            for idx, (section_text, _) in enumerate(sections):
                json_result = {
                    "text": section_text,
                }

                images = []
                if section_images and len(section_images) > idx and section_images[idx] is not None:
                    images.append(section_images[idx])
                if images:
                    # If multiple images found, combine them using concat_img
                    combined_image = reduce(concat_img, images) if len(images) > 1 else images[0]
                    json_result["image"] = combined_image

                json_results.append(json_result)

            table_ctx = conf.get("table_context_size", 0) or 0
            image_ctx = conf.get("image_context_size", 0) or 0
            if table_ctx or image_ctx:
                json_results = attach_media_context(json_results, table_ctx, image_ctx)

            self.set_output("json", json_results)
        else:
            self.set_output("text", "\n".join([section_text for section_text, _ in sections]))

    def _image(self, name, blob):
        from deepdoc.vision import OCR

        self.callback(random.randint(1, 5) / 100.0, "Start to work on an image.")
        conf = self._param.setups["image"]
        self.set_output("output_format", conf["output_format"])

        img = Image.open(io.BytesIO(blob)).convert("RGB")

        if conf["parse_method"] == "ocr":
            # use ocr, recognize chars only
            ocr = OCR()
            bxs = ocr(np.array(img))  # return boxes and recognize result
            txt = "\n".join([t[0] for _, t in bxs if t[0]])
        else:
            lang = conf["lang"]
            # use VLM to describe the picture
            cv_model = LLMBundle(self._canvas.get_tenant_id(), LLMType.IMAGE2TEXT, llm_name=conf["parse_method"], lang=lang)
            img_binary = io.BytesIO()
            img.save(img_binary, format="JPEG")
            img_binary.seek(0)

            system_prompt = conf.get("system_prompt")
            if system_prompt:
                txt = cv_model.describe_with_prompt(img_binary.read(), system_prompt)
            else:
                txt = cv_model.describe(img_binary.read())

        self.set_output("text", txt)

    def _audio(self, name, blob):
        import os
        import tempfile

        self.callback(random.randint(1, 5) / 100.0, "Start to work on an audio.")

        conf = self._param.setups["audio"]
        self.set_output("output_format", conf["output_format"])
        _, ext = os.path.splitext(name)
        with tempfile.NamedTemporaryFile(suffix=ext) as tmpf:
            tmpf.write(blob)
            tmpf.flush()
            tmp_path = os.path.abspath(tmpf.name)

            seq2txt_mdl = LLMBundle(self._canvas.get_tenant_id(), LLMType.SPEECH2TEXT)
            txt = seq2txt_mdl.transcription(tmp_path)

            self.set_output("text", txt)

    def _video(self, name, blob):
        self.callback(random.randint(1, 5) / 100.0, "Start to work on an video.")

        conf = self._param.setups["video"]
        self.set_output("output_format", conf["output_format"])

        cv_mdl = LLMBundle(self._canvas.get_tenant_id(), LLMType.IMAGE2TEXT, llm_name=conf["llm_id"])
        txt = cv_mdl.chat(system="", history=[], gen_conf={}, video_bytes=blob, filename=name)

        self.set_output("text", txt)

    def _email(self, name, blob):
        self.callback(random.randint(1, 5) / 100.0, "Start to work on an email.")

        email_content = {}
        conf = self._param.setups["email"]
        self.set_output("output_format", conf["output_format"])
        target_fields = conf["fields"]

        _, ext = os.path.splitext(name)
        if ext == ".eml":
            # handle eml file
            from email import policy
            from email.parser import BytesParser

            msg = BytesParser(policy=policy.default).parse(io.BytesIO(blob))
            email_content["metadata"] = {}
            # handle header info
            for header, value in msg.items():
                # get fields like from, to, cc, bcc, date, subject
                if header.lower() in target_fields:
                    email_content[header.lower()] = value
                # get metadata
                elif header.lower() not in ["from", "to", "cc", "bcc", "date", "subject"]:
                    email_content["metadata"][header.lower()] = value
            # get body
            if "body" in target_fields:
                body_text, body_html = [], []

                def _add_content(m, content_type):
                    def _decode_payload(payload, charset, target_list):
                        try:
                            target_list.append(payload.decode(charset))
                        except (UnicodeDecodeError, LookupError):
                            for enc in ["utf-8", "gb2312", "gbk", "gb18030", "latin1"]:
                                try:
                                    target_list.append(payload.decode(enc))
                                    break
                                except UnicodeDecodeError:
                                    continue
                            else:
                                target_list.append(payload.decode("utf-8", errors="ignore"))

                    if content_type == "text/plain":
                        payload = msg.get_payload(decode=True)
                        charset = msg.get_content_charset() or "utf-8"
                        _decode_payload(payload, charset, body_text)
                    elif content_type == "text/html":
                        payload = msg.get_payload(decode=True)
                        charset = msg.get_content_charset() or "utf-8"
                        _decode_payload(payload, charset, body_html)
                    elif "multipart" in content_type:
                        if m.is_multipart():
                            for part in m.iter_parts():
                                _add_content(part, part.get_content_type())

                _add_content(msg, msg.get_content_type())

                email_content["text"] = "\n".join(body_text)
                email_content["text_html"] = "\n".join(body_html)
            # get attachment
            if "attachments" in target_fields:
                attachments = []
                for part in msg.iter_attachments():
                    content_disposition = part.get("Content-Disposition")
                    if content_disposition:
                        dispositions = content_disposition.strip().split(";")
                        if dispositions[0].lower() == "attachment":
                            filename = part.get_filename()
                            payload = part.get_payload(decode=True).decode(part.get_content_charset())
                            attachments.append(
                                {
                                    "filename": filename,
                                    "payload": payload,
                                }
                            )
                email_content["attachments"] = attachments
        else:
            # handle msg file
            import extract_msg

            print("handle a msg file.")
            msg = extract_msg.Message(blob)
            # handle header info
            basic_content = {
                "from": msg.sender,
                "to": msg.to,
                "cc": msg.cc,
                "bcc": msg.bcc,
                "date": msg.date,
                "subject": msg.subject,
            }
            email_content.update({k: v for k, v in basic_content.items() if k in target_fields})
            # get metadata
            email_content["metadata"] = {
                "message_id": msg.messageId,
                "in_reply_to": msg.inReplyTo,
            }
            # get body
            if "body" in target_fields:
                email_content["text"] = msg.body[0] if isinstance(msg.body, list) and msg.body else msg.body
                if not email_content["text"] and msg.htmlBody:
                    email_content["text"] = msg.htmlBody[0] if isinstance(msg.htmlBody, list) and msg.htmlBody else msg.htmlBody
            # get attachments
            if "attachments" in target_fields:
                attachments = []
                for t in msg.attachments:
                    attachments.append(
                        {
                            "filename": t.name,
                            "payload": t.data.decode("utf-8"),
                        }
                    )
                email_content["attachments"] = attachments

        if conf["output_format"] == "json":
            self.set_output("json", [email_content])
        else:
            content_txt = ""
            for k, v in email_content.items():
                if isinstance(v, str):
                    # basic info
                    content_txt += f"{k}:{v}" + "\n"
                elif isinstance(v, dict):
                    # metadata
                    content_txt += f"{k}:{json.dumps(v)}" + "\n"
                elif isinstance(v, list):
                    # attachments or others
                    for fb in v:
                        if isinstance(fb, dict):
                            # attachments
                            content_txt += f"{fb['filename']}:{fb['payload']}" + "\n"
                        else:
                            # str, usually plain text
                            content_txt += fb
            self.set_output("text", content_txt)

    async def _invoke(self, **kwargs):
        function_map = {
            "pdf": self._pdf,
            "text&markdown": self._markdown,
            "spreadsheet": self._spreadsheet,
            "slides": self._slides,
            "word": self._word,
            "image": self._image,
            "audio": self._audio,
            "video": self._video,
            "email": self._email,
        }

        try:
            from_upstream = ParserFromUpstream.model_validate(kwargs)
        except Exception as e:
            self.set_output("_ERROR", f"Input error: {str(e)}")
            return

        name = from_upstream.name
        if self._canvas._doc_id:
            b, n = File2DocumentService.get_storage_address(doc_id=self._canvas._doc_id)
            blob = settings.STORAGE_IMPL.get(b, n)
        else:
            blob = FileService.get_blob(from_upstream.file["created_by"], from_upstream.file["id"])

        done = False
        for p_type, conf in self._param.setups.items():
            if from_upstream.name.split(".")[-1].lower() not in conf.get("suffix", []):
                continue
            await trio.to_thread.run_sync(function_map[p_type], name, blob)
            done = True
            break

        if not done:
            raise Exception("No suitable for file extension: `.%s`" % from_upstream.name.split(".")[-1].lower())

        outs = self.output()
        async with trio.open_nursery() as nursery:
            for d in outs.get("json", []):
                nursery.start_soon(image2id, d, partial(settings.STORAGE_IMPL.put, tenant_id=self._canvas._tenant_id), get_uuid())
